iT邦幫忙

2025 iThome 鐵人賽

DAY 17
0

Day 17: 並行處理與效能優化

今天我們要讓 AI 助理變得更快更高效!透過並行處理和效能優化技術,系統能夠同時執行多個任務,大幅縮短處理時間,提升使用者體驗。

⚡ 為什麼需要並行處理?

在實際應用中,許多任務可以同時執行:

  • 🔄 多任務處理:同時查詢天氣、新聞、股價
  • 📊 批次處理:一次處理多個文件或圖片
  • 🌐 API 併發呼叫:平行發送多個外部請求
  • 💾 資料處理:同步讀取和寫入資料

並行處理能將原本需要順序執行的任務改為同時進行,大幅提升效率。

🏗 專案結構

parallel_processing/
├── main.py                          # 主程式
├── core/
│   ├── __init__.py
│   ├── parallel_executor.py         # 並行執行器
│   └── task_pool.py                 # 任務池
├── optimizers/
│   ├── __init__.py
│   ├── cache_manager.py             # 快取管理器
│   └── batch_processor.py           # 批次處理器
├── workflows/
│   ├── __init__.py
│   └── parallel_workflow.py         # 並行工作流程
└── utils/
    ├── __init__.py
    └── performance_monitor.py       # 效能監控器

🔧 核心實作

1. 並行執行器 (core/parallel_executor.py)

import concurrent.futures
from typing import List, Dict, Any, Callable
import time
from dataclasses import dataclass
import threading

@dataclass
class ParallelTask:
    """並行任務"""
    id: str
    name: str
    function: Callable
    args: tuple = ()
    kwargs: dict = None
    timeout: float = 30.0
    
    def __post_init__(self):
        if self.kwargs is None:
            self.kwargs = {}

class ParallelExecutor:
    """並行執行器"""
    
    def __init__(self, max_workers: int = 5):
        self.max_workers = max_workers
        self.execution_stats = {
            'total_tasks': 0,
            'successful_tasks': 0,
            'failed_tasks': 0,
            'total_time': 0.0
        }
    
    def execute_parallel(self, tasks: List[ParallelTask]) -> Dict[str, Any]:
        """並行執行多個任務"""
        if not tasks:
            return {'success': True, 'results': {}}
        
        start_time = time.time()
        results = {}
        errors = {}
        
        print(f"🚀 開始並行執行 {len(tasks)} 個任務...")
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # 提交所有任務
            future_to_task = {
                executor.submit(self._execute_single_task, task): task 
                for task in tasks
            }
            
            # 收集結果
            for future in concurrent.futures.as_completed(future_to_task):
                task = future_to_task[future]
                
                try:
                    result = future.result(timeout=task.timeout)
                    results[task.id] = result
                    self.execution_stats['successful_tasks'] += 1
                    print(f"✅ 任務完成: {task.name}")
                    
                except concurrent.futures.TimeoutError:
                    error_msg = f"任務逾時 (>{task.timeout}秒)"
                    errors[task.id] = error_msg
                    self.execution_stats['failed_tasks'] += 1
                    print(f"⏰ 任務逾時: {task.name}")
                    
                except Exception as e:
                    errors[task.id] = str(e)
                    self.execution_stats['failed_tasks'] += 1
                    print(f"❌ 任務失敗: {task.name} - {e}")
        
        end_time = time.time()
        execution_time = end_time - start_time
        
        self.execution_stats['total_tasks'] += len(tasks)
        self.execution_stats['total_time'] += execution_time
        
        print(f"⏱️ 並行執行完成,耗時: {execution_time:.2f} 秒")
        
        return {
            'success': len(errors) == 0,
            'results': results,
            'errors': errors,
            'execution_time': execution_time,
            'tasks_count': len(tasks),
            'success_rate': len(results) / len(tasks) if tasks else 0
        }
    
    def _execute_single_task(self, task: ParallelTask) -> Any:
        """執行單個任務"""
        try:
            return task.function(*task.args, **task.kwargs)
        except Exception as e:
            raise Exception(f"任務執行失敗: {str(e)}")
    
    def get_stats(self) -> Dict[str, Any]:
        """獲取執行統計"""
        total = self.execution_stats['total_tasks']
        
        return {
            **self.execution_stats,
            'success_rate': (
                self.execution_stats['successful_tasks'] / total 
                if total > 0 else 0
            ),
            'average_time': (
                self.execution_stats['total_time'] / total 
                if total > 0 else 0
            )
        }

2. 快取管理器 (optimizers/cache_manager.py)

from typing import Any, Optional, Callable
import time
import hashlib
import json
from functools import wraps

class CacheManager:
    """快取管理器"""
    
    def __init__(self, default_ttl: int = 300):
        self.cache = {}
        self.default_ttl = default_ttl  # 預設存活時間(秒)
        self.hit_count = 0
        self.miss_count = 0
    
    def get(self, key: str) -> Optional[Any]:
        """從快取獲取值"""
        if key in self.cache:
            entry = self.cache[key]
            
            # 檢查是否過期
            if time.time() < entry['expires_at']:
                self.hit_count += 1
                print(f"💾 快取命中: {key}")
                return entry['value']
            else:
                # 過期,刪除
                del self.cache[key]
        
        self.miss_count += 1
        return None
    
    def set(self, key: str, value: Any, ttl: int = None) -> None:
        """設定快取值"""
        ttl = ttl or self.default_ttl
        
        self.cache[key] = {
            'value': value,
            'created_at': time.time(),
            'expires_at': time.time() + ttl
        }
        
        print(f"💾 快取已設定: {key} (TTL: {ttl}秒)")
    
    def invalidate(self, key: str) -> None:
        """使快取失效"""
        if key in self.cache:
            del self.cache[key]
            print(f"🗑️ 快取已清除: {key}")
    
    def clear_all(self) -> None:
        """清除所有快取"""
        self.cache.clear()
        print("🗑️ 所有快取已清除")
    
    def get_stats(self) -> dict:
        """獲取快取統計"""
        total_requests = self.hit_count + self.miss_count
        hit_rate = self.hit_count / total_requests if total_requests > 0 else 0
        
        return {
            'cache_size': len(self.cache),
            'hit_count': self.hit_count,
            'miss_count': self.miss_count,
            'hit_rate': hit_rate
        }
    
    def cached(self, ttl: int = None):
        """快取裝飾器"""
        def decorator(func: Callable):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # 生成快取鍵
                cache_key = self._generate_cache_key(func.__name__, args, kwargs)
                
                # 嘗試從快取獲取
                cached_value = self.get(cache_key)
                if cached_value is not None:
                    return cached_value
                
                # 執行函數
                result = func(*args, **kwargs)
                
                # 儲存到快取
                self.set(cache_key, result, ttl)
                
                return result
            
            return wrapper
        return decorator
    
    def _generate_cache_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
        """生成快取鍵"""
        # 將參數序列化為字串
        key_data = {
            'func': func_name,
            'args': str(args),
            'kwargs': str(sorted(kwargs.items()))
        }
        
        key_str = json.dumps(key_data, sort_keys=True)
        
        # 生成 hash
        return hashlib.md5(key_str.encode()).hexdigest()

3. 批次處理器 (optimizers/batch_processor.py)

from typing import List, Any, Callable, Dict
import google.generativeai as genai
import os
import time

genai.configure(api_key=os.getenv('GEMINI_API_KEY'))

class BatchProcessor:
    """批次處理器"""
    
    def __init__(self, batch_size: int = 5):
        self.batch_size = batch_size
        self.model = genai.GenerativeModel('gemini-2.0-flash-exp')
    
    def process_batch(self, items: List[Any], 
                     processor_func: Callable,
                     show_progress: bool = True) -> List[Dict[str, Any]]:
        """批次處理項目"""
        results = []
        total = len(items)
        
        print(f"📦 開始批次處理 {total} 個項目 (批次大小: {self.batch_size})")
        
        for i in range(0, total, self.batch_size):
            batch = items[i:i + self.batch_size]
            batch_num = i // self.batch_size + 1
            total_batches = (total + self.batch_size - 1) // self.batch_size
            
            if show_progress:
                print(f"🔄 處理批次 {batch_num}/{total_batches}...")
            
            # 處理批次
            batch_start = time.time()
            
            for item in batch:
                try:
                    result = processor_func(item)
                    results.append({
                        'item': item,
                        'result': result,
                        'success': True
                    })
                except Exception as e:
                    results.append({
                        'item': item,
                        'error': str(e),
                        'success': False
                    })
            
            batch_time = time.time() - batch_start
            
            if show_progress:
                print(f"✅ 批次 {batch_num} 完成 (耗時: {batch_time:.2f}秒)")
            
            # 批次間短暫延遲,避免 API 限流
            if i + self.batch_size < total:
                time.sleep(0.5)
        
        success_count = sum(1 for r in results if r['success'])
        print(f"📊 批次處理完成: {success_count}/{total} 成功")
        
        return results
    
    def batch_analyze_texts(self, texts: List[str], 
                           analysis_type: str = "sentiment") -> List[Dict]:
        """批次分析文字"""
        
        def analyze_single(text: str) -> Dict:
            prompts = {
                "sentiment": f"分析以下文字的情感傾向(正面/中性/負面):{text}",
                "summary": f"用一句話總結:{text}",
                "category": f"將以下文字分類:{text}"
            }
            
            prompt = prompts.get(analysis_type, prompts["sentiment"])
            
            try:
                response = self.model.generate_content(prompt)
                return {'analysis': response.text}
            except Exception as e:
                return {'error': str(e)}
        
        return self.process_batch(texts, analyze_single)

4. 並行工作流程 (workflows/parallel_workflow.py)

from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Dict, Any
from core.parallel_executor import ParallelExecutor, ParallelTask
from optimizers.cache_manager import CacheManager
import google.generativeai as genai
import os
import time

genai.configure(api_key=os.getenv('GEMINI_API_KEY'))

class ParallelWorkflowState(TypedDict):
    user_queries: List[str]
    parallel_results: Dict[str, Any]
    aggregated_response: str
    execution_time: float

class ParallelWorkflow:
    """並行處理工作流程"""
    
    def __init__(self):
        self.executor = ParallelExecutor(max_workers=5)
        self.cache = CacheManager(default_ttl=300)
        self.model = genai.GenerativeModel('gemini-2.5-flash')
    
    def create_workflow(self):
        """創建並行工作流程"""
        workflow = StateGraph(ParallelWorkflowState)
        
        workflow.add_node("process_parallel", self.process_queries_parallel)
        workflow.add_node("aggregate_results", self.aggregate_results_node)
        
        workflow.set_entry_point("process_parallel")
        workflow.add_edge("process_parallel", "aggregate_results")
        workflow.add_edge("aggregate_results", END)
        
        return workflow.compile()
    
    def process_queries_parallel(self, state: ParallelWorkflowState) -> ParallelWorkflowState:
        """並行處理多個查詢"""
        queries = state["user_queries"]
        
        print(f"\n🚀 準備並行處理 {len(queries)} 個查詢...")
        
        # 創建並行任務
        tasks = []
        for i, query in enumerate(queries):
            task = ParallelTask(
                id=f"query_{i}",
                name=f"處理查詢: {query[:30]}...",
                function=self.process_single_query_with_cache,
                args=(query,),
                timeout=15.0
            )
            tasks.append(task)
        
        # 執行並行處理
        start_time = time.time()
        execution_result = self.executor.execute_parallel(tasks)
        execution_time = time.time() - start_time
        
        return {
            **state,
            "parallel_results": execution_result['results'],
            "execution_time": execution_time
        }
    
    def process_single_query_with_cache(self, query: str) -> str:
        """處理單個查詢(帶快取)"""
        # 嘗試從快取獲取
        cached_result = self.cache.get(f"query_{query}")
        if cached_result:
            return cached_result
        
        # 實際處理
        try:
            response = self.model.generate_content(query)
            result = response.text
            
            # 儲存到快取
            self.cache.set(f"query_{query}", result, ttl=300)
            
            return result
            
        except Exception as e:
            return f"處理失敗: {str(e)}"
    
    def aggregate_results_node(self, state: ParallelWorkflowState) -> ParallelWorkflowState:
        """聚合結果節點"""
        results = state["parallel_results"]
        queries = state["user_queries"]
        
        print("\n📊 聚合處理結果...")
        
        # 組合所有結果
        aggregated_text = "🎯 **並行處理結果總覽**\n\n"
        
        for i, query in enumerate(queries):
            query_id = f"query_{i}"
            result = results.get(query_id, "無結果")
            
            aggregated_text += f"**問題 {i+1}:** {query}\n"
            aggregated_text += f"**回答:** {result[:200]}{'...' if len(result) > 200 else ''}\n\n"
            aggregated_text += "-" * 50 + "\n\n"
        
        # 添加效能資訊
        execution_time = state["execution_time"]
        aggregated_text += f"\n⏱️ 總執行時間: {execution_time:.2f} 秒\n"
        aggregated_text += f"📈 平均每查詢: {execution_time/len(queries):.2f} 秒\n"
        
        # 快取統計
        cache_stats = self.cache.get_stats()
        aggregated_text += f"💾 快取命中率: {cache_stats['hit_rate']:.1%}\n"
        
        return {
            **state,
            "aggregated_response": aggregated_text
        }

5. 效能監控器 (utils/performance_monitor.py)

import time
from functools import wraps
from typing import Callable, Dict, Any
import statistics

class PerformanceMonitor:
    """效能監控器"""
    
    def __init__(self):
        self.metrics = {}
    
    def measure_time(self, func_name: str = None):
        """測量執行時間的裝飾器"""
        def decorator(func: Callable):
            name = func_name or func.__name__
            
            if name not in self.metrics:
                self.metrics[name] = []
            
            @wraps(func)
            def wrapper(*args, **kwargs):
                start_time = time.time()
                result = func(*args, **kwargs)
                end_time = time.time()
                
                execution_time = end_time - start_time
                self.metrics[name].append(execution_time)
                
                print(f"⏱️ {name} 執行時間: {execution_time:.3f} 秒")
                
                return result
            
            return wrapper
        return decorator
    
    def get_statistics(self, func_name: str = None) -> Dict[str, Any]:
        """獲取效能統計"""
        if func_name:
            if func_name not in self.metrics or not self.metrics[func_name]:
                return {}
            
            times = self.metrics[func_name]
            return self._calculate_stats(func_name, times)
        
        # 返回所有函數的統計
        all_stats = {}
        for name, times in self.metrics.items():
            if times:
                all_stats[name] = self._calculate_stats(name, times)
        
        return all_stats
    
    def _calculate_stats(self, name: str, times: list) -> Dict[str, float]:
        """計算統計資訊"""
        return {
            'function': name,
            'call_count': len(times),
            'total_time': sum(times),
            'avg_time': statistics.mean(times),
            'min_time': min(times),
            'max_time': max(times),
            'median_time': statistics.median(times)
        }
    
    def print_report(self):
        """列印效能報告"""
        print("\n📊 效能報告")
        print("=" * 60)
        
        all_stats = self.get_statistics()
        
        for func_name, stats in all_stats.items():
            print(f"\n函數: {func_name}")
            print(f"  呼叫次數: {stats['call_count']}")
            print(f"  總時間: {stats['total_time']:.3f} 秒")
            print(f"  平均時間: {stats['avg_time']:.3f} 秒")
            print(f"  最小時間: {stats['min_time']:.3f} 秒")
            print(f"  最大時間: {stats['max_time']:.3f} 秒")
            print(f"  中位數: {stats['median_time']:.3f} 秒")

6. 主程式 (main.py)

from workflows.parallel_workflow import ParallelWorkflow
from utils.performance_monitor import PerformanceMonitor

def main():
    """並行處理與效能優化示例"""
    print("⚡ 並行處理與效能優化系統")
    print("🚀 支援多任務並行、快取機制、批次處理")
    print("=" * 55)
    
    workflow_manager = ParallelWorkflow()
    workflow = workflow_manager.create_workflow()
    monitor = PerformanceMonitor()
    
    # 測試查詢集
    test_query_sets = {
        "多領域查詢": [
            "Python 的主要特性是什麼?",
            "機器學習的基本概念",
            "什麼是 REST API?"
        ],
        "資料分析": [
            "如何進行數據清洗?",
            "常見的統計方法有哪些?",
            "資料視覺化的最佳實踐"
        ],
        "程式設計": [
            "什麼是設計模式?",
            "如何優化程式效能?",
            "單元測試的重要性"
        ]
    }
    
    while True:
        print("\n選擇操作:")
        print("1. 執行預設並行查詢測試")
        print("2. 自訂並行查詢")
        print("3. 查看快取統計")
        print("4. 查看效能報告")
        print("5. 清除快取")
        print("6. 退出")
        
        choice = input("\n請選擇 (1-6):").strip()
        
        try:
            if choice == '1':
                print("\n📋 可用的測試集:")
                test_sets = list(test_query_sets.keys())
                for i, name in enumerate(test_sets, 1):
                    print(f"{i}. {name}")
                
                set_num = input("選擇測試集 (1-3):").strip()
                
                if set_num.isdigit() and 1 <= int(set_num) <= len(test_sets):
                    set_name = test_sets[int(set_num) - 1]
                    queries = test_query_sets[set_name]
                    
                    print(f"\n🧪 執行測試:{set_name}")
                    print(f"查詢數量:{len(queries)}")
                    
                    # 執行工作流程
                    initial_state = {
                        "user_queries": queries,
                        "parallel_results": {},
                        "aggregated_response": "",
                        "execution_time": 0.0
                    }
                    
                    print("\n" + "="*55)
                    
                    # 使用效能監控
                    @monitor.measure_time("parallel_workflow")
                    def run_workflow():
                        return workflow.invoke(initial_state)
                    
                    result = run_workflow()
                    
                    print("="*55)
                    print(f"\n{result['aggregated_response']}")
                    
                else:
                    print("❌ 無效的選擇")
            
            elif choice == '2':
                print("\n📝 自訂並行查詢")
                print("請輸入多個問題(每行一個,輸入空行結束):")
                
                custom_queries = []
                while True:
                    query = input().strip()
                    if not query:
                        break
                    custom_queries.append(query)
                
                if custom_queries:
                    initial_state = {
                        "user_queries": custom_queries,
                        "parallel_results": {},
                        "aggregated_response": "",
                        "execution_time": 0.0
                    }
                    
                    result = workflow.invoke(initial_state)
                    print(f"\n{result['aggregated_response']}")
                else:
                    print("❌ 沒有輸入查詢")
            
            elif choice == '3':
                cache_stats = workflow_manager.cache.get_stats()
                print("\n💾 快取統計資訊:")
                print(f"快取大小:{cache_stats['cache_size']}")
                print(f"命中次數:{cache_stats['hit_count']}")
                print(f"未命中次數:{cache_stats['miss_count']}")
                print(f"命中率:{cache_stats['hit_rate']:.1%}")
            
            elif choice == '4':
                monitor.print_report()
                
                executor_stats = workflow_manager.executor.get_stats()
                print("\n⚡ 並行執行統計:")
                print(f"總任務數:{executor_stats['total_tasks']}")
                print(f"成功任務:{executor_stats['successful_tasks']}")
                print(f"失敗任務:{executor_stats['failed_tasks']}")
                print(f"成功率:{executor_stats['success_rate']:.1%}")
                print(f"平均執行時間:{executor_stats['average_time']:.3f} 秒")
            
            elif choice == '5':
                workflow_manager.cache.clear_all()
                print("✅ 快取已清除")
            
            elif choice == '6':
                print("👋 再見!")
                break
            
            else:
                print("❌ 無效的選擇")
                
        except KeyboardInterrupt:
            print("\n👋 再見!")
            break
        except Exception as e:
            print(f"❌ 發生錯誤:{e}")

if __name__ == "__main__":
    main()

🎯 系統特色

多執行緒並行:使用 ThreadPoolExecutor 同時處理多個任務
智能快取機制:減少重複計算,提升回應速度
批次處理:高效處理大量資料
效能監控:即時追蹤執行時間和資源使用
容錯處理:單一任務失敗不影響其他任務

🚀 效能對比

順序執行 vs 並行執行:

順序執行 3 個查詢:
查詢1: 2.5秒
查詢2: 2.3秒  
查詢3: 2.4秒
總時間: 7.2秒

並行執行 3 個查詢:
同時執行...
總時間: 2.6秒

效能提升: 約 177% 🚀

今天我們學習了並行處理和效能優化技術,讓 AI 助理能夠更快速地處理多個任務。明天我們將探索人機協作介面設計,提升互動體驗!


上一篇
Day 16: 條件判斷與分支邏輯
下一篇
Day 18: 人機協作介面設計
系列文
30 天從零到 AI 助理:Gemini CLI 與 LangGraph 輕鬆上手21
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言